import multiprocessing as mp import os, time, random # 子进程要执行的代码 defrun_work(ID): print('Run process %s (%s)...' % (ID, os.getpid())) print('process %s is done' %(ID)) if __name__=='__main__': print'Parent process %s.' % os.getpid() print'Process will start.' record=[] for i in range(3): p = mp.Process(target=run_proc, args=(i,)) p.start() record.append(p) for p in record: p.join() print'Process end.'
#### 结果为####
Parent process 6848. Process will start.
Run process 0 (4900)... process 0is done Run process 1 (5720)... process 1is done Run process 2 (6508)... process 2is done
defrun(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1
if __name__ == '__main__': p = ClockProcess(3) p.start()
#### 结果 #### #the time is Sat Sep 26 11:10:38 2015 #the time is Sat Sep 26 11:10:41 2015 #the time is Sat Sep 26 11:10:44 2015 #the time is Sat Sep 26 11:10:47 2015 #the time is Sat Sep 26 11:10:50 2015
#wait_for_event: starting #wait_for_event_timeout:starting #wait_for_event_timeout:e.is_set->False #main: event is set #wairt_for_event: e.is_set()->True
deffunc(ID): print("ID: %d", % ID) time.sleep(3) print("end of process %s" % os.getpid())
if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in range(4): pool.apply_async(func, (i, )) #维持执行的进程总数为processes=3,当一个进程执行完毕后会添加新的进程进去
print"starting the pool !!!" pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3。因为非阻塞,主函数执行独立于进程的执行,所以运行完for循环后直接输出“starting the pool !!!”,主程序在pool.join()处等待各个进程的结束。若为阻塞的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#coding: utf-8 import multiprocessing import time
deffunc(ID): print("ID: %d", % ID) time.sleep(1) print("end of process %s" % os.getpid())
if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in range(4): pool.apply(func, (i, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
print"starting the pool !!!" pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print"Sub-process(es) done."
deffunc(ID): print("ID: %d" % ID) time.sleep(1) print("end of process %s" % os.getpid()) return"done"+ str(ID)
if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(10): result.append(pool.apply_async(func, (i, ))) pool.close() pool.join() for res in result: print":::", res.get() print("Sub-process(es) done.")
classTask(object): def__init__(self, a, b): self.a = a self.b = b def__call__(self):## we can use Task() to excute this method return'%s * %s = %s' % (self.a, self.b, self.a * self.b) def__str__(self): return'%s * %s' % (self.a, self.b)
if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() flag=[0] # Start process num_process = multiprocessing.cpu_count() print ('Creating %d consumers' % num_process) record = [ work_process(tasks, results,flag[0]) for i in range(2) ] for w in work_process: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i+1)) # Add a exit mechanism for each thread for i in range(4): tasks.put(None)
flag[0]=1 # Wait for all of the tasks to finish tasks.join()
# Start printing results while num_jobs: result = results.get() print ('Result:', result) num_jobs -= 1